Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test out processing different partitions concurrently #221

Closed

Conversation

Alexander-Blair
Copy link
Contributor

@Alexander-Blair Alexander-Blair commented Feb 21, 2021

The issue: #188

Adds a ConcurrentRunner, which creates a thread pool to orchestrate two or more consumers

  • Adds the code in a way that does not affect existing APIs (you never know what your users will decide to depend on)
  • The signal handling is a bit more painful as you can't stop threads from within a trap context. The approach is similar to the one outlined here. The Ruby documentation mentions that IO.pipe is not supported on all platforms, however I can't find which platforms these are; worst case it could be replaced with the first example from the above article
  • Includes quite a big overhaul of the existing integration test so that we have a fresh topic for each spec, and we also clean up the topics at the end.

What hasn't been done yet?

  • Extensive testing
  • Some kind of benchmarking in a semi-realistic setup. Few possible scenarios:
    • Consuming a pile of messages and writing one straight out again
    • Consuming a pile of messages and syncing it into a database

Outstanding question(s):

  • How do I create a pre-release version to test what I have so far?
  • Should there be a hard limit on max concurrency?

Couple of other points:

  • When we spin up multiple runners, we end up duplicating the consumer config that we add here: https://github.com/zendesk/racecar/blob/master/lib/racecar/runner.rb#L115
  • Sometimes when running the entire test suite, I'll get Errno::EMFILE: Too many open files. This seems to be fixed by upping the number of open file descriptors with ulimit -n 1024. On my machine this was set to 256 by default

@dasch
Copy link
Contributor

dasch commented Feb 22, 2021

Thanks, it's great to see this being worked on!

I think it's best to start by deciding on the overall architecture, based on what the desired capability should be.

It looks like the architecture you're proposing has each thread working as a fully independent consumer with no shared state between threads, and each consumer coordinating with the total set of group members using the Kafka protocol. The main benefit here is that you can run multiple consumers on a single host and avoid allocating as much extra memory as you'd otherwise would have – is that a correct assumption?

If so, I'm not sure I think the benefit outweighs the added complexity. For one, I suspect that memory use will often be dominated by the workload rather than long-lived application objects – Kafka consumers load tons of messages into memory, and the deserialization process (both at the protocol and application levels) incur even more allocations. So I think further sharing long-lived object allocations has limited practical benefits.

If we were to focus on increasing memory sharing, I think a multi-process parent-child forking setup a la Resque or Unicorn would be more reliable. Since there's no shared state anyway, and each thread does its own IO, there's really little benefit to using threads at all.

However, if we were to change focus from running entire consumers concurrently to concurrent execution within a single consumer then I think threads make much more sense – but the complexity of implementation increases as well.

I think the JVM client does some of this, so perhaps looking into its capabilities and design would be worthwhile. But off the top of my head, I think it could make sense to allow a consumer to process messages concurrently, in the following way:

  • A thread pool in instantiated, and each thread creates an instance of the consumer class.
  • We need to set up a mapping of topic-partition to consumer thread, either ahead of time, or lazily as new messages come in. This will of course be a likely source of unfairness, so it’ll be interesting to see what algorithms or libraries could help out here (although I’m always wary of adding new dependencies.)
  • When new batches arrive, they’re partitioned by partition and placed on a thread safe queue associated with the owning thread.
  • Each thread will pull a partition-batch from the queue and pass it to the user code (either the entire batch in the case of process_batch or each message in turn in the case of process.
  • Committing offsets could be tricky – I’m actually not sure how best to interface with ruby-rdkafka, but my intuition is that there are two options: either have each thread block on committing offsets for the specific partitions it owns, if there’s an API for that, or push status reports to a singleton thread that only does commits – but we then need to control admission into the processing threads so as to not get unbounded growth of in-flight messages.
  • Similarly, controlling the produce side could also be tricky.

All in all, I think there’s lots of potential, but it’s not a trivial problem.

If the goal is a more immediate increase in memory friendliness, then I think implementing better support for a forking mode is the way to go.

@Alexander-Blair
Copy link
Contributor Author

I'll give the forking approach a go! The assumptions were correct on the approach - these consumers would be independent of each other.

@Alexander-Blair
Copy link
Contributor Author

Will close this and open a separate PR for a process forking version

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants